/*
 * Copyright 2021-2022 Open Kunlun Technology <https://www.openkunlun.io>
 */

package io.openkunlun.scaladsl.client

import akka.actor.{ ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import com.google.protobuf.ByteString
import io.openkunlun.scaladsl.model.{ State, StateItem }
import io.openkunlun.scaladsl.serialization.{ DaprObjectSerialization, JsonSerialization }
import io.openkunlun.scaladsl.util.Strings
import io.openkunlun.scaladsl.v1.StateOptions.StateConsistency
import io.openkunlun.scaladsl.v1.{ StateItem => DaprStateItem, _ }

import scala.concurrent.{ ExecutionContext, Future }

/**
 * @author ericxin.
 */
private object DaprStateClient extends ExtensionId[DaprStateClient] with ExtensionIdProvider {
  override def lookup: ExtensionId[DaprStateClient] = DaprStateClient
  override def createExtension(system: ExtendedActorSystem) = new DaprStateClient(system)
}
private class DaprStateClient(system: ExtendedActorSystem) extends Extension {
  private val serialization: DaprObjectSerialization = JsonSerialization(system)
  private val daprClient = DaprGrpcClient(system)

  def getState[T: Manifest](action: GetStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[State[T]] = {
    for {
      result <- if (headers.nonEmpty) {
        var builder = daprClient.client.getState()
        headers.foreach(it => builder = builder.addHeader(it._1, it._2))
        builder.invoke(GetStateRequest(action.storeName, action.key, stateConsistency(action.consistency), action.metadata))
      } else {
        daprClient.client.getState(GetStateRequest(action.storeName, action.key, stateConsistency(action.consistency), action.metadata))
      }
    } yield State(serialization.deserialize[T](result.data.toByteArray), action.key, Option(result.etag).filter(Strings.nonEmpty), result.metadata)
  }

  def getBulkState[T: Manifest](action: GetBulkStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Seq[State[T]]] = {
    for {
      result <- if (headers.nonEmpty) {
        var builder = daprClient.client.getBulkState()
        headers.foreach(it => builder = builder.addHeader(it._1, it._2))
        builder.invoke(GetBulkStateRequest(action.storeName, action.keys, action.parallelism, action.metadata))
      } else {
        daprClient.client.getBulkState(GetBulkStateRequest(action.storeName, action.keys, action.parallelism, action.metadata))
      }
    } yield result.items.map(it => State(serialization.deserialize[T](it.data.toByteArray), it.key, Option(it.etag).filter(Strings.nonEmpty), it.metadata))
  }

  def deleteState[T: Manifest](action: DeleteStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Unit] = {
    //TODO options ???
    if (headers.nonEmpty) {
      var builder = daprClient.client.deleteState()
      headers.foreach(it => builder = builder.addHeader(it._1, it._2))
      for {
        _ <- builder.invoke(DeleteStateRequest(action.storeName, action.key, action.etag.map(it => Etag(it)), None, action.metadata))
      } yield ()
    } else {
      for {
        _ <- daprClient.client.deleteState(DeleteStateRequest(action.storeName, action.key, action.etag.map(it => Etag(it)), None, action.metadata))
      } yield ()
    }
  }

  def setState[T: Manifest](action: SetStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Unit] = {
    if (headers.nonEmpty) {
      var builder = daprClient.client.setState()
      headers.foreach(it => builder = builder.addHeader(it._1, it._2))
      for {
        _ <- builder.invoke(SetStateRequest(action.storeName, action.states.map(serializeStateItem)))
      } yield ()
    } else {
      for {
        _ <- daprClient.client.setState(SetStateRequest(action.storeName, action.states.map(serializeStateItem)))
      } yield ()
    }
  }

  def executeState[T: Manifest](action: ExecuteStateAction, headers: Map[String, String] = Map.empty)(implicit ec: ExecutionContext, serialization: DaprObjectSerialization = serialization): Future[Unit] = {
    val request = ExecuteStateRequest(action.storeName, action.operations.map(it => io.openkunlun.scaladsl.v1.TransactionalStateOperation(it.operation, Some(serializeStateItem(it.state)))))
    if (headers.nonEmpty) {
      var builder = daprClient.client.executeState()
      headers.foreach(it => builder = builder.addHeader(it._1, it._2))
      for {
        _ <- builder.invoke(request)
      } yield ()
    } else {
      for {
        _ <- daprClient.client.executeState(request)
      } yield ()
    }
  }

  private def stateConsistency(consistency: String) = {
    if (consistency.equalsIgnoreCase(StateConsistency.CONSISTENCY_UNSPECIFIED.name)) {
      StateConsistency.CONSISTENCY_UNSPECIFIED
    } else if (consistency.equalsIgnoreCase(StateConsistency.CONSISTENCY_EVENTUAL.name)) {
      StateConsistency.CONSISTENCY_EVENTUAL
    } else StateConsistency.CONSISTENCY_STRONG
  }

  private def serializeStateItem(item: StateItem)(implicit serialization: DaprObjectSerialization): io.openkunlun.scaladsl.v1.StateItem = {
    //TODO options ???
    DaprStateItem(item.key, ByteString.copyFrom(serialization.serialize(item.value)), item.etag.map(it => Etag(it)), item.metadata, None)
  }
}
